package com.hmc.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * 配置生产者事务
 */
public class CustomProducerTransaction {
    public static void main(String[] args) {
        Properties properties = CustomProducer.getProperties();

        //开启事务必须配置事务id
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transaction_id_01");

        //创建生产者
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        //初始化事务
        producer.initTransactions();
        //开启事务
        producer.beginTransaction();

        try {
            //发送数据
            for (int i = 0 ; i <= 10 ; i++) {
                producer.send(new ProducerRecord<>("first", "hmc"+i));
            }
//            int i = 1/0;
            //提交事务
            producer.commitTransaction();
        } catch (Exception e) {
            //放弃事务
            producer.abortTransaction();
            e.printStackTrace();
        } finally {
            //关闭资源
            producer.close();
        }

    }
}
